Skip to content

Spring Cloud Data Flow

初体验

  1. 下载可执行包。

    必要:

    非必要:

  2. 启动。

    sh
    $ java -jar spring-cloud-dataflow-server-2.8.5.jar

    还可以指定 Maven 仓库路径(默认当前用户目录下的 .m2/repository),后面发布应用需要:

    sh
    $ java -jar spring-cloud-dataflow-server-2.8.5.jar --maven.localRepository=C:\Users\Pan\.m2\repository
  3. 进入控制面板。

    访问 http://localhost:9393/dashboard/ 进入控制面板。

  4. 准备一个普通的 Maven 可执行 JAR。

  5. 在控制面板中添加 Application

    Type 选择 task

    Uri 填写 maven://study.helloworld:data-flow-sample:jar:0.0.1-SNAPSHOT

  6. 在控制面板中创建 Task

    选择刚才添加的应用。

  7. 启动这个 Task

安装

手动安装

下载服务器jar文件

使用如下命令下载Spring Cloud Data Flow Server和shell:

sh
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.10.2/spring-cloud-dataflow-server-2.10.3.jar

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.10.2/spring-cloud-dataflow-shell-2.10.3.jar

执行以下命令下载Skipper:

sh
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.9.2/spring-cloud-skipper-server-2.9.2.jar

安装消息传递中间件

这些指令要求RabbitMQ与Skipper、Spring Cloud Data Flow服务器和Shell运行在同一台机器上。

安装和运行Kafka,使用如下命令:

sh
$ tar -zxvf kafka_2.13-3.5.0.tgz -C /usr/local/
$ cd /usr/local/kafka_2.13-3.5.0

# Start the ZooKeeper service
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

# Start the Kafka broker service
$ nohup bin/kafka-server-start.sh config/server.properties &

启动服务器jar

Skipper

在下载Skipper的目录下,使用java -jar运行服务器,如下所示:

sh
nohup java -jar spring-cloud-skipper-server-2.9.2.jar > skipper.log &

要指定远程Maven仓库地址,使用--maven.remote-repositories.repo1.url=https://repo1属性,如下所示:

sh
nohup java -jar spring-cloud-skipper-server-2.9.2.jar --maven.remote-repositories.repo1.url=http://maven.aliyun.com/nexus/content/groups/public/ > skipper.log &
Dataflow

在下载Data Flow的目录下,使用java -jar运行服务器,如下所示:

sh
nohup java -jar spring-cloud-dataflow-server-2.10.3.jar > dataflow.log &

如果Skipper和Data Flow服务器不在同一台主机上运行,则将spring.cloud.skipper.client.serverUri配置属性设置为Skipper的位置,示例如下:

sh
nohup java -jar spring-cloud-dataflow-server-2.10.3.jar --spring.cloud.skipper.client.serverUri=https://192.51.100.1:7577/api &
Shell

如果你想使用Spring Cloud Data Flow shell,用下面的命令启动它:

sh
java -jar spring-cloud-dataflow-shell-2.10.3.jar

如果数据流服务器和shell不在同一台主机上运行,您也可以使用shell中的dataflow config Server命令将shell指向数据流服务器的URL,如下所示:

sh
server-unknown:>dataflow config server https://198.51.100.0
Successfully targeted https://198.51.100.0

或者,您可以传入--dataflow.uri命令行选项。shell的--help命令行选项显示了可用的选项。

访问数据流仪表板

现在可以导航到Spring Cloud Data Flow Dashboard。在浏览器中,导航到Spring Cloud Data Flow Dashboard URL

概念

应用类型

长生命周期应用

寿命较长的应用程序需要连续运行。如果应用程序停止,平台负责重新启动它。

Spring Cloud Stream框架提供了一个编程模型来简化连接到公共消息传递系统的消息驱动微服务应用程序的编写。您可以编写与特定中间件无关的核心业务逻辑。要使用的中间件是通过添加Spring Cloud Stream Binder库作为应用程序的依赖项来确定的。

Data Flow服务器委托Skipper服务器部署长期存在的应用程序。

带有源、处理器和接收器的流

Spring Cloud Stream定义了绑定接口的概念,它将消息交换模式封装在代码中,即应用程序的输入和输出是什么。Spring Cloud Stream提供了几个绑定接口,这些接口对应于以下常见的消息交换契约:

  • 源(Source):将消息发送到目的地的消息生成器。
  • 接收器(Sink):从目的地读取消息的消息使用者。
  • 处理器(Processor):源和接收的组合。处理器使用来自目的地的消息,并生成要发送到另一个目的地的消息。

这三种类型的应用程序通过使用源、处理器和接收器来描述正在注册的应用程序的类型,从而在数据流中注册。

下面的示例显示了注册http源(侦听http请求并将http有效载荷发送到目的地的应用程序)和日志接收器(从目的地消费并记录接收到的消息的应用程序)的shell语法:

sh
dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.0.RELEASE
Successfully registered application 'source:http'

dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.1.0.RELEASE
Successfully registered application 'sink:log'

通过在Data Flow中注册http和log,你可以使用stream Pipeline DSL创建一个流定义,它使用管道和过滤器语法,如下例所示:

sh
dataflow:>stream create --name httpStream --definition "http | log"

http | log中的管道符号表示源输出到接收输入的连接。Data Flow在部署流时设置适当的属性,以便源可以通过消息传递中间件与接收通信。

短生命周期应用

短寿命应用程序运行一段时间(通常是几分钟到几小时),然后终止。它们的运行可能基于一个时间表(例如,每个工作日早上6点)或响应一个事件(例如,一个文件被放入FTP服务器)。

Spring Cloud Task框架允许您开发一个短寿命的微服务,该微服务记录短寿命应用程序的生命周期事件(如开始时间、结束时间和退出代码)。

任务应用程序通过使用名称task来描述应用程序的类型,从而在Data Flow中注册。

下面的例子展示了注册时间戳任务(打印当前时间并退出的应用程序)的shell语法:

sh
dataflow:> app register --name timestamp --type task --uri maven://org.springframework.cloud.task.app:timestamp-task:2.1.0.RELEASE

通过引用任务的名称来创建任务定义,如下面的示例所示:

sh
dataflow:> task create tsTask --definition "timestamp"

Spring Batch框架可能是编写短期应用程序的Spring开发人员想到的。Spring Batch提供了比Spring Cloud Task更丰富的功能集,建议在处理大量数据时使用。用例可能是读取许多CSV文件,转换每一行数据,并将转换后的每一行写入数据库。Spring Batch提供了自己的数据库模式,其中包含关于Spring Batch作业执行的更丰富的信息集。Spring Cloud Task与Spring Batch集成,因此,如果Spring Cloud Task应用程序定义了Spring Batch作业,则会创建Spring Cloud Task和Spring Batch运行表之间的链接。

使用Spring Batch的任务以前面所示的相同方式注册和创建。

Spring Cloud Data Flow服务器将任务启动到平台。

流处理

流处理被定义为在没有交互或中断的情况下处理无限数量的数据。流处理的业务案例包括:

  • 实时信用卡欺诈检测或预测分析
  • 用于可操作分析的近实时业务数据处理

Spring Cloud Data Flow中的流处理在架构上被实现为一组独立的事件驱动流应用程序,这些应用程序使用可选的消息传递中间件(例如RabbitMQ或Apache Kafka)进行连接。独立应用程序的集合在运行时聚集在一起,构成流数据管道。管道可以是线性的,也可以是非线性的,这取决于应用程序之间的数据流。

消息中间件

已部署的流应用程序通过消息传递中间件产品进行通信。我们提供了预先构建的流应用程序,可以通过RabbitMQ或Kafka进行通信,您可以使用它与各种数据产品集成。

Spring Cloud Stream

对于Spring开发人员,我们建议使用Spring Cloud Stream框架编写自定义流应用程序。Spring Cloud Stream允许您轻松构建与共享消息传递系统相连的高度可扩展的事件驱动微服务。

作为一名开发人员,您可以专注于开发应用程序的业务逻辑,同时将底层API复杂性和带有消息代理的连接性样板委托给Spring Cloud Stream。

在高层次上,流应用程序可以通过消息传递中间件生成、处理或使用事件。

下一步

如果您对使用预构建的应用程序来创建流数据管道感兴趣,请参阅流入门指南。

如果你对使用Spring Cloud stream编写和部署自定义流处理应用程序感兴趣,请参阅stream开发者指南。

预先构建的应用程序

Spring团队提供并支持一系列预打包的应用程序,您可以使用这些应用程序组装各种数据集成和处理管道,并支持Spring Cloud data Flow开发、学习和实验。

开始入门

所有预打包的流应用程序:

  • 作为Apache Maven构件或Docker映像可用。
  • 使用RabbitMQ或Apache Kafka。
  • 支持通过Prometheus和InfluxDB进行监控。
  • 包含UI中使用的应用程序属性的元数据和shell中的代码完成。

您可以通过使用Data Flow UI或shell注册流和任务应用程序。

可以使用app register命令单独注册应用程序,也可以使用app import命令批量注册应用程序。

对于流,取决于你是使用Kafka还是RabbitMQ,你可以通过使用它们各自的url来注册应用程序:

Kafka

RabbitMQ

当您使用数据流UI时,预填充包括如下图所示的链接:

在Data Flow Shell中,您可以批量导入和注册应用程序,如下例所示:

sh
dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest

Released under the MIT License.